package com.atguigu.gmall.realtime.common.base;

import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.FlinkSourceUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public abstract class BaseApp {
    public void start(int port, int parallelism, String ckAndGroupId, String topic){
        // TODO 1. 环境准备
        // 1.1 指定流环境
        Configuration conf = new Configuration();
        conf.set(RestOptions.PORT,port);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        // 1.2 指定并行度
        env.setParallelism(parallelism);
        // TODO 2. 检查点相关设置
        // 2.1 开启检查点
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 2.2 设置检查点超时时间
        //checkpointConfig.setCheckpointTimeout(60000L);
        // 2.3 设置job取消后检查点是否保留
        //checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.4 设置两个检查点时间间隔
        //checkpointConfig.setMinPauseBetweenCheckpoints(2000L);
        // 2.5 设置重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
        // 2.6 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        // 2.7 设置检查点存储路径
        //checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ck/");
        // 2.8 设置操作Hadoop的用户
        //System.setProperty("HADOOP_USER_NAME", "atguigu");
        // TODO 3.从kafka的主题读取数据
        // 3.2 创建kafkaSource消费者对象
        KafkaSource<String> kafkaSource = FlinkSourceUtil.getKafkaSource(topic, ckAndGroupId);
        // 3.3 消费数据 封装流
        DataStreamSource<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_Source");

        handle(env, kafkaStrDS);

        // TODO 5.提交作业
        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public abstract void handle(StreamExecutionEnvironment env, DataStreamSource<String> kafkaStrDS);

}
